package com.lq.rabbitmq.deom04;

import com.lq.rabbitmq.utile.RabbitmqUtile;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListMap;

/**
 * 发布确认模式
 * 1.单个确认
 * 2.批量确认
 * 3.异步确认
 */
public class ConfigMessage {
    private static int MESSAGE_COUNT = 1000;

    public static void main(String[] args) throws IOException, InterruptedException {

        //1.单个确认     耗时：38秒
        //  SingleModeConfirmation();
        //  2.批量确认
        //BatchConfirmationMode(); //耗时：0.476秒
        //3.异步确认    耗时：0.109秒
        //  AsynchronousConfirmationMode();

    }

    //单个确认
    public static void SingleModeConfirmation() throws IOException, InterruptedException {
        Channel channel = RabbitmqUtile.createConnection();
        //队列的声明
        String queueName = String.valueOf(UUID.randomUUID());
        channel.queueDeclare(queueName, true, false, false, null);
        //开启发布确认
        channel.confirmSelect();
        //开始时间
        long begin = System.currentTimeMillis();
        //批量发送消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String s = "" + i;
            channel.basicPublish("", queueName, null, s.getBytes());
            //获取是否确认
            boolean falg = channel.waitForConfirms();
            if (falg) {
                System.out.println("信息发送成功");
            }
        }
        long end = System.currentTimeMillis();
        System.out.println("耗时：" + (end - begin) / 1000 + "秒");
    }

    //单个确认
    public static void BatchConfirmationMode() throws IOException, InterruptedException {
        Channel channel = RabbitmqUtile.createConnection();
        //队列的声明
        String queueName = String.valueOf(UUID.randomUUID());
        channel.queueDeclare(queueName, true, false, false, null);
        //开启发布确认
        channel.confirmSelect();
        //开始时间
        long begin = System.currentTimeMillis();
        //批量大小
        int batchSize = 100;
        //批量发送消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String s = "" + i;
            channel.basicPublish("", queueName, null, s.getBytes());
            if (i % batchSize == 0) {
                channel.waitForConfirms();
            }
        }
        long end = System.currentTimeMillis();
        System.out.println("耗时：" + (double) (end - begin) / 1000 + "秒");

    }

    //异步确认模式
    public static void AsynchronousConfirmationMode() throws IOException {
        Channel channel = RabbitmqUtile.createConnection();
        //队列的声明
        String queueName = String.valueOf(UUID.randomUUID());
        channel.queueDeclare(queueName, true, false, false, null);
        //开启发布确认
        channel.confirmSelect();
        //开始时间
        long begin = System.currentTimeMillis();

        /**
         *  线程安全的哈希表
         */
        ConcurrentSkipListMap<Long, String> map = new ConcurrentSkipListMap<>();

        // tag 是信息内容  falg是否是批量
        //成功的信息
        ConfirmCallback confirmCallback = (tag, falg) -> {
            map.remove(tag);
            System.out.println("确认发送消息:" + tag);
        };
        //失败的信息
        ConfirmCallback callback = (tag, falg) -> {
            System.out.println("未确认发送消息:" + map.get(tag));
        };

        //信息监听器  异步
        channel.addConfirmListener(confirmCallback, callback);

        //批量发送消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String s = "" + i;
            channel.basicPublish("", queueName, null, s.getBytes());
            map.put(channel.getNextPublishSeqNo(), s);
        }
        long end = System.currentTimeMillis();
        System.out.println("耗时：" + (double) (end - begin) / 1000 + "秒");
    }

}
